package com.ecommerce.service.impl;

import com.alibaba.fastjson.JSON;
import com.ecommerce.account.AddressInfo;
import com.ecommerce.account.BalanceInfo;
import com.ecommerce.common.TableId;
import com.ecommerce.dao.EcommerceOrderDao;
import com.ecommerce.entity.EcommerceOrder;
import com.ecommerce.feign.AddressClient;
import com.ecommerce.feign.NotSecuredBalanceClient;
import com.ecommerce.feign.NotSecuredGoodsClient;
import com.ecommerce.feign.SecuredGoodsClient;
import com.ecommerce.filter.AccessContext;
import com.ecommerce.goods.DeductGoodsInventory;
import com.ecommerce.goods.SimpleGoodsInfo;
import com.ecommerce.order.LogisticsMessage;
import com.ecommerce.order.OrderInfo;
import com.ecommerce.service.OrderService;
import com.ecommerce.source.LogisticsSource;
import com.ecommerce.vo.PageSimpleOrderDetail;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * 订单相关服务接口实现
 */
@Slf4j
@Service
@EnableBinding(LogisticsSource.class)
@SuppressWarnings("all")
public class OrderServiceImpl implements OrderService {

    // 表的 Dao 接口
    private final EcommerceOrderDao ecommerceOrderDao;

    // Feign 接口
    private final AddressClient addressClient;
    private final SecuredGoodsClient securedGoodsClient;
    private final NotSecuredGoodsClient notSecuredGoodsClient;
    private final NotSecuredBalanceClient notSecuredBalanceClient;

    // SpringCloud Stream的发射器
    private final LogisticsSource logisticsSource;

    public OrderServiceImpl(
            EcommerceOrderDao ecommerceOrderDao,
            AddressClient addressClient,
            SecuredGoodsClient securedGoodsClient,
            NotSecuredGoodsClient notSecuredGoodsClient,
            NotSecuredBalanceClient notSecuredBalanceClient,
            LogisticsSource logisticsSource
    ) {
        this.ecommerceOrderDao = ecommerceOrderDao;
        this.addressClient = addressClient;
        this.securedGoodsClient = securedGoodsClient;
        this.notSecuredGoodsClient = notSecuredGoodsClient;
        this.notSecuredBalanceClient = notSecuredBalanceClient;
        this.logisticsSource = logisticsSource;
    }

    /**
     * 创建订单, 涉及到分布式事务
     * 创建订单步骤较多, 当不满足情况时直接抛出异常：
     *  1. 校验请求是否合法
     *  2. 创建订单
     *  3. 扣减商品库存
     *  4. 扣减用户余额
     *  5. 发送订单物流消息 SpringCloud Stream + kafka
     */
    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public TableId createOrder(OrderInfo orderInfo) {
        // 获取地址信息
        AddressInfo addressInfo = addressClient.getAddressInfoByTableId(
                new TableId(Collections.singletonList(new TableId.Id(orderInfo.getUserAddressId())))
        ).getData();

        // 1. 校验请求是否合法（商品信息不需要校验，留至扣减库存）
        if (CollectionUtils.isEmpty(addressInfo.getAddressItems())) {
            throw new RuntimeException("user address is not exist: "+ orderInfo.getUserAddressId());
        }

        // 2. 创建订单
        EcommerceOrder newOrder = ecommerceOrderDao.save(
                new EcommerceOrder(
                        AccessContext.getLoginUserInfo().getId(),
                        orderInfo.getUserAddressId(),
                        JSON.toJSONString(orderInfo.getOrderItems())
                )
        );
        log.info("create order success: [{}], [{}]",
                AccessContext.getLoginUserInfo().getId(), newOrder.getId());

        // 3. 扣减商品库存
        if (!notSecuredGoodsClient.deductGoodsInventory(
                orderInfo.getOrderItems().stream().map(
                        OrderInfo.OrderItem::toDeductGoodsInventory
                ).collect(Collectors.toList())
        ).getData()) {
            throw new RuntimeException("deduct goods inventory failure");
        }

        // 4. 扣减用户账户余额
        // 4.1 获取商品信息, 计算总价格
        List<SimpleGoodsInfo> goodsInfos = notSecuredGoodsClient.getSimpleGoodsInfoByTableId(
                new TableId(
                        orderInfo.getOrderItems().stream().map(
                                orderItem -> new TableId.Id(orderItem.getGoodsId())
                        ).collect(Collectors.toList())
                )
        ).getData();
        Map<Long, SimpleGoodsInfo> goodsIdToGoodsInfo = goodsInfos.stream()
                .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity()));
        long balance = 0;
        for (OrderInfo.OrderItem orderItem : orderInfo.getOrderItems()) {
            balance += goodsIdToGoodsInfo.get(orderItem.getGoodsId()).getPrice()
                    * orderItem.getCount();
        }
        assert balance > 0;

        // 4.2 填写总价格, 扣减账户余额
        BalanceInfo balanceInfo = notSecuredBalanceClient.deductBalance(
                new BalanceInfo(AccessContext.getLoginUserInfo().getId(), balance)
        ).getData();
        if (balanceInfo == null) {
            throw new RuntimeException("deduct user balance failure");
        }
        log.info("deduct user balance: [{}], [{}]", newOrder.getId(), JSON.toJSONString(balanceInfo));

        // 5. 发送订单物流信息 SpringCloud Stream + kafka
        LogisticsMessage logisticsMessage = new LogisticsMessage(
                AccessContext.getLoginUserInfo().getId(),
                newOrder.getId(),
                orderInfo.getUserAddressId(),
                null // 无备注信息
        );

        if (!logisticsSource.logisticsOutput().send(
                MessageBuilder.withPayload(JSON.toJSONString(logisticsMessage)).build()
        )) {
            throw new RuntimeException("send logistics message faliure");
        }
        log.info("send create order message to kafka with stream: [{}]",
                JSON.toJSONString(logisticsMessage));

        // 返回订单 id
        return new TableId(Collections.singletonList(new TableId.Id(newOrder.getId())));
    }

    /**
     * 分页用户订单接口
     */
    @Override
    public PageSimpleOrderDetail getSimpleOrderDetailByPage(int page) {
        if (page <= 0) {
            page = 1; // 默认是第一页
        }

        // 这里分页规则：一页10条数据, 按照 id 倒序排列
        Pageable pageable = PageRequest.of(page - 1, 10, Sort.by("id").descending());

        Page<EcommerceOrder> orderPage = ecommerceOrderDao.findAllByUserId(
                AccessContext.getLoginUserInfo().getId(), pageable
        );

        List<EcommerceOrder> ecommerceOrders = orderPage.getContent();

        // 如果返回的 List 为空, 直接返回空数组, 表示没有订单被查询到
        if (CollectionUtils.isEmpty(ecommerceOrders)) {
            return new PageSimpleOrderDetail(Collections.emptyList(), false);
        }

        // 获取当前订单中所有的 goodsId, 这个 Set 不可能为空
        Set<Long> goodsIdsInOrders = new HashSet<>();
        ecommerceOrders.forEach(
                o -> {
                    List<DeductGoodsInventory> goodsAndCount = JSON.parseArray(
                            o.getOrderDetail(), DeductGoodsInventory.class);
                    goodsIdsInOrders.addAll(goodsAndCount.stream()
                            .map(DeductGoodsInventory::getGoodsId)
                            .collect(Collectors.toList()));
                }
        );

        // 是否还有更多页: 总页数是否大于当前给定的页
        boolean hasMaore = orderPage.getTotalPages() > page;
        List<SimpleGoodsInfo> goodsInfos = securedGoodsClient.getSimpleGoodsInfoByTableId(
                new TableId(goodsIdsInOrders.stream()
                        .map(TableId.Id::new)
                        .collect(Collectors.toList()))
        ).getData();
        AddressInfo addressInfo = addressClient.getAddressInfoByTableId(
                new TableId(
                        ecommerceOrders.stream()
                                .map(o -> new TableId.Id(o.getAddressId()))
                                .distinct() // 去重
                                .collect(Collectors.toList())
                )
        ).getData();

        // 组装订单中的商品, 地址信息 -> 订单信息
        return new PageSimpleOrderDetail(
                assembleSimpleOrderDetail(ecommerceOrders, goodsInfos, addressInfo),
                hasMaore
        );
    }

    /**
     * 组装订单详情
     */
    private List<PageSimpleOrderDetail.SingleOrderItem> assembleSimpleOrderDetail(
            List<EcommerceOrder> orders,
            List<SimpleGoodsInfo> goodsInfos,
            AddressInfo addressInfo
    ) {
        // goodsId -> SimpleGoodsInfo
        Map<Long, SimpleGoodsInfo> idToGoodsInfo = goodsInfos.stream()
                .collect(Collectors.toMap(SimpleGoodsInfo::getId, Function.identity()));
        // addressId -> addressInfo
        Map<Long, AddressInfo.AddressItem> idToAddressItem = addressInfo.getAddressItems()
                .stream()
                .collect(Collectors.toMap(AddressInfo.AddressItem::getId, Function.identity()));

        List<PageSimpleOrderDetail.SingleOrderItem> result = new ArrayList<>(orders.size());
        orders.forEach(
                o -> {
                    PageSimpleOrderDetail.SingleOrderItem orderItem =
                            new PageSimpleOrderDetail.SingleOrderItem();
                    orderItem.setId(o.getId());
                    orderItem.setUserAddress(idToAddressItem.getOrDefault(o.getAddressId(),
                            new AddressInfo.AddressItem(-1L)).toUserAddress());
                    orderItem.setGoodsItems(buildOrderGoodsItem(o, idToGoodsInfo));
                    result.add(orderItem);
                }
        );
        return result;
    }

    /**
     * 构造订单中的商品信息
     */
    private List<PageSimpleOrderDetail.SingleOrderGoodsItem> buildOrderGoodsItem(
            EcommerceOrder order, Map<Long, SimpleGoodsInfo> idToGoodsInfo
    ) {
        List<PageSimpleOrderDetail.SingleOrderGoodsItem> goodsItems = new ArrayList<>();
        List<DeductGoodsInventory> goodsAndCount = JSON.parseArray(
                order.getOrderDetail(), DeductGoodsInventory.class
        );
        goodsAndCount.forEach(
                gc -> {
                    PageSimpleOrderDetail.SingleOrderGoodsItem goodsItem =
                            new PageSimpleOrderDetail.SingleOrderGoodsItem();
                    goodsItem.setCount(gc.getCount());
                    goodsItem.setSimpleGoodsInfo(idToGoodsInfo.getOrDefault(
                            gc.getGoodsId(), new SimpleGoodsInfo(-1L)));
                    goodsItems.add(goodsItem);
                }
        );
        return goodsItems;
    }
}
